import os, sys, cv2
import numpy as np
import pandas as pd
pd.plotting.register_matplotlib_converters() # datetime convert
import datetime, json
import dill as pickle
pickle_root = os.path.realpath(os.path.join('.', '_archive'))
if not os.path.exists(pickle_root): os.makedirs(pickle_root)
model_root = os.path.realpath(os.path.join('.', 'model'))
if not os.path.exists(model_root): os.makedirs(model_root)
from pprint import pprint
from IPython.core.display import display, HTML
import scipy ; print("scipy v." + str(scipy.__version__))
from scipy.fft import rfft, rfftfreq # FastFourrierTransform on Real input
import scipy.stats as stats
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 12})
%matplotlib inline
%reload_ext autoreload
%autoreload 2
from my_TS_Anomaly_lib.utils import millify, strfdelta, print_timedelta
import pymongo ; print('{:20s}'.format("pymongo v." + str(pymongo.version)), end='- ')
import findspark ; findspark.init()
import pyspark ; print("pyspark v." + str(pyspark.version.__version__))
import tensorflow ; print('{:20s}'.format("tensorflow v." + str(tensorflow.version.VERSION)), end='- ')
import tensorboard ; print("tensorboard v." + str(tensorboard.__version__.__str__()))
. This NASA data repository gathers a collection of datasets that have been donated by various universities, agencies, or companies.
- University of Cincinnati, entitled Wavelet filter-based weak signature detection method and its application on rolling element bearing prognostics
.
|
|
|
|
|
|
REMARK : for 'test 1', 8 sensors were used (2 per bearing), when for both 'test 2' and 'test 3' only 4 sensors were deemed sufficient to be mounted on the test bench (1 per bearing).
In this companion notebook
, we automate the download of the data from the NASA repository and load it into a Mongo DB database. The source data is provided as a compressed set of very numerous files, each consisting in tens of thousands time-slices of 1 second sensors recordings at a sampling rate of 20 kHz. We use multiprocessing to speedup the data dumps to Mongo DB.
from pymongo import MongoClient
username = 'readOnlyUser' ; password = 'readOnlyUser_password'
client = MongoClient('mongodb://%s:%s@127.0.0.1' % (username, password)
, appname='Abnormal Vibration Watchdog')
cursor = client.nasa_ims_database.measurements.find(
{'test_id': test_id}, limit=2)
for document in cursor : pprint(document)
REMARK : Once the data downloaded, we can follow this link to the README file that comes alongside it :
. This README provides a little further details on the experiment run to produce the dataset than introduced §1..
from my_TS_Anomaly_lib.nasa_bearing_ims_mongo import get_test_first_timestamps
data = get_test_first_timestamps(
mongo_client = client
, database_name = 'nasa_ims_database'
, test_id = test_id
, timestamps_count = timestamps_count
, sensor_name = sensor_name
)
timestamps = pd.DatetimeIndex(data['timestamp'].unique()).sort_values(ascending = True)
fig, ax = plt.subplots(len(timestamps), 1, figsize=(15, 6), sharey = 'all')
fig.subplots_adjust(hspace = .5, wspace=.001)
ax = ax.ravel()
for i, timestamp in enumerate(timestamps) :
data[data['timestamp'] == timestamp] \
.plot('long_datetime', [sensor_name], ax = ax[i])
ax[i].get_legend().remove() ; ax[i].set_xlabel(None)
ax[i].set_xticks(ax[i].get_xticks()) # otherwize 'FixedFormatter' warning
ax[i].set_xticklabels([timestamp]*len(ax[i].get_xticklabels())
, rotation=0, ha='center')
fig.suptitle(str(timestamps_count) + " first sensory measurements " +
"on sensor '" + sensor_name + "' for test #" + str(test_id) + "\n" +
"(one second each)", fontsize=16)
fig.tight_layout() ; fig.subplots_adjust(top=.9) ; plt.show()
measurement_steps_count = data[data['timestamp'] == timestamp].shape[0]
# sample frequencies of the measuring device (in Hertz)
# (in our case, each measurement lasts 1 second) =>
SAMPLE_RATE = measurement_steps_count
xf = rfftfreq(measurement_steps_count, 1 / SAMPLE_RATE)
fig_rows_count = 2 ; fig_columns_count = -round(-(len(timestamps) / fig_rows_count))
fig, ax = plt.subplots(fig_rows_count, fig_columns_count, figsize=(15, 6), sharey = 'all')
ax = ax.ravel() ; fig.subplots_adjust(hspace = .1, wspace=.0)
for i, timestamp in enumerate(timestamps) :
yf = rfft(data[data['timestamp'] == timestamp][sensor_name].values)
ax[i].plot(xf, abs(yf)) ; ax[i].margins(x=0, y=0)
ax[i].set_xticks(ax[i].get_xticks()) # otherwize 'FixedFormatter' warning
xticks = [millify(xtick, True)+'Hz' for xtick in ax[i].get_xticks()]
xticks[0] = '0' if i%fig_columns_count == 0 else ''
ax[i].set_xticklabels(xticks, rotation=0, ha='right')
fig.suptitle("Spectre of the vibration signal recorded by '" + sensor_name + "' " +
"for the " + str(timestamps_count) + " first measurements " +
"of test #" + str(test_id), fontsize=16)
fig.subplots_adjust(top=.93) ; plt.show()
We will be interfacing our NASA IMS Measurements Mongo DB database collection thru the MONGODB SPARK CONNECTOR
. This will allow for distributed access and pre-processing of our data.
from pyspark import SparkContext
from pyspark.sql import SparkSession
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import schema
# add the 'mongo-spark-connector' classpath info
# PRIOR to the JVM start on this notebook kernel
spark_jars_directory = os.path.realpath("C:/Users/Organization/.ivy2/jars/")
sys.path.insert(0, spark_jars_directory)
SUBMIT_ARGS = '--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.2 --driver-memory 2G pyspark-shell'
# - where "2.11" is our Scala version
# (on which our Spark 2.4.4 is running)
# - where 2.4.2 is the mongodb-spark connector version
# (compatible with our Spark 2.4.4 version)
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
## Start Spark
master = "local[*]" ; app_name = "Abnormal_Vibration_Watchdog"
sc = SparkContext(master = master, appName = app_name)
spark = SparkSession(sc).builder.getOrCreate()
display(HTML("<em>To monitor jobs & stages, go to the " +
"<a href='" + spark.sparkContext.uiWebUrl + "/environment/'" +
" target='_blank'>Spark UI</a> (on your Spark master host)</em>"))
database_name = 'nasa_ims_database'
pipeline = "[]" # "[{ $match: {test_id: " + str(test_id) + "} }]"
df = (
spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://" + username + ":" + password + "@127.0.0.1:27017")
.option("database", database_name)
.option("collection", "measurements")
.schema(schema) # skips the (slow) "records sampling / schema retrieval" internals
.option("batchSize", 70_000)
.option("pipeline", pipeline)
.load()
)
df.printSchema()
from pyspark.sql import functions as F
start_time = datetime.datetime.now()
df_abs = df.withColumn('sensor_1', F.abs(df.sensor_1)) \
.withColumn('sensor_2', F.abs(df.sensor_2)) \
.withColumn('sensor_3', F.abs(df.sensor_3)) \
.withColumn('sensor_4', F.abs(df.sensor_4)) \
.withColumn('sensor_5', F.abs(df.sensor_5)) \
.withColumn('sensor_6', F.abs(df.sensor_6)) \
.withColumn('sensor_7', F.abs(df.sensor_7)) \
.withColumn('sensor_8', F.abs(df.sensor_8))
df_aggregated = df_abs.groupBy(["test_id", "timestamp"]).agg(
F.mean('sensor_1').alias('sensor_1')
, F.mean('sensor_2').alias('sensor_2')
, F.mean('sensor_3').alias('sensor_3')
, F.mean('sensor_4').alias('sensor_4')
, F.mean('sensor_5').alias('sensor_5')
, F.mean('sensor_6').alias('sensor_6')
, F.mean('sensor_7').alias('sensor_7')
, F.mean('sensor_8').alias('sensor_8')
)
silent = df_aggregated.cache()
print_timedelta(start_time)
REMARK : We notify Spark to mark this intermediary dataframe for caching, as it is an intermediary part to our larger data transformation pipeline. That way, our Spark cluster will not need to re-process that stage when we'll later resume our pre-processing data pipelining.
start_time = datetime.datetime.now()
records = df_aggregated.rdd.collect() # initiate Spark computation
print_timedelta(start_time)
data = pd.DataFrame(
records
, columns = records[0].__fields__
).sort_values(by=['test_id', 'timestamp']).reset_index(drop=True)
test_start = data.groupby('test_id')['timestamp'].min()
print(test_start.to_string())
relative_timestamps = []
for tuple_row in data.itertuples() :
test_id = tuple_row[1]
relative_timestamps.append(tuple_row[2] - test_start[test_id])
#print(relative_timestamps[-1])
data['relative_timestamp'] = relative_timestamps
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import test_plot
fig, ax = plt.subplots(nrows = 1, ncols = 3, sharex='all', sharey='all', figsize=(12, 6), dpi=80)
test_plot(data, test_id = 1, ax = ax[0])
test_plot(data, test_id = 2, ax = ax[1])
test_plot(data, test_id = 3, ax = ax[2])
ax[0].legend() ; ax[0].set_ylabel('Amplitude (one-second average absolute)')
fig.suptitle('Bearing Sensor Data', fontsize=16)
fig.tight_layout() ; fig.subplots_adjust(top=.94) ; plt.show()
.
). :
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import get_sensors_capped_sql_statement
df_aggregated.filter(df_aggregated['test_id'] == train_test_id).createOrReplaceTempView("test_"+str(train_test_id)+"_agg_tab")
sql_statement = get_sensors_capped_sql_statement(spark, train_test_id, save_last_days_count)
#print(sql_statement)
query_resultset = spark.sql(sql_statement)
capped_df = \
query_resultset.toDF('timestamp', 'sensor_1', 'sensor_2', 'sensor_3', 'sensor_4')
silent = capped_df.cache()
#concatenated_df.explain() ; #print(str(concatenated_df))
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
columns_to_scale = ['sensor_1', 'sensor_2', 'sensor_3', 'sensor_4']
assembler = VectorAssembler(inputCols=columns_to_scale, outputCol="features", handleInvalid="keep")
scaler = MinMaxScaler(inputCol='features', outputCol='scaledFeatures')
pipeline = Pipeline(stages = [assembler] + [scaler])
# initiate Spark computation (eager transformation)
start_time = datetime.datetime.now()
scalerModel = pipeline.fit(capped_df) # fitting the scalers
print_timedelta(start_time)
originalMins = scalerModel.stages[1].originalMin ; originalMaxs = scalerModel.stages[1].originalMax
#print("MinMaxScaler (sensor_1) : " + str(originalMins[0]) + " ; " + str(originalMaxs[0]))
def create_scaler_func(
originalMin
, originalMax
) :
def sensor_abs_mean_measurements_scaler(Xreal) :
return (Xreal - originalMin) / ( originalMax - originalMin )
return sensor_abs_mean_measurements_scaler
#create_scaler_func(originalMins[0], originalMaxs[0])([.092, .025]) # scaler to be used on sensor 1 ".abs().avg()" data
for i in range(4) :
with open(os.path.join(model_root, 'sensor_'+str(i+1)+'_scaler.pickle'), 'wb') as file_pi:
pickle.dump(create_scaler_func(originalMins[i], originalMaxs[i]), file_pi, protocol=3)
from my_TS_Anomaly_lib.nasa_bearing_ims_spark import vector_row_element
# splitting the scaledFeatures vector (of 4-elements rows)
df_scaled = \
scalerModel.transform(capped_df).select(
["timestamp"] +
["sensor_"+str(i+1) for i in range(4)] +
[
vector_row_element("scaledFeatures", F.lit(i)).alias("scaled_sensor_"+str(i+1))
for i in range(4)
]
)
silent = df_scaled.cache()
# initiate Spark computation
start_time = datetime.datetime.now()
scaled_records = df_scaled.rdd.collect()
print_timedelta(start_time)
scaled_data = pd.DataFrame(
scaled_records
, columns = scaled_records[0].__fields__
).reset_index(drop=True)
fig, ax = plt.subplots(figsize = (16, 3))
plt.title('Scaled Bearings Vibration Displacements\n' +
'(absolute mean over 1-second long measurements)'
, fontsize=16)
scaled_data.plot(x = 'timestamp', y = ['scaled_sensor_1', 'scaled_sensor_2'
, 'scaled_sensor_3', 'scaled_sensor_4']
, alpha = 0.4, ax = ax)
plt.legend(ncol=2) ; plt.show()
#ax.set_ylabel('Vibration Amplitude\n(one-second average absolute - SCALED)')
Since we're done with it, we can now clean the Spark cache from the objects we had marked as persistent along the way =>
silent = spark.catalog.dropTempView("test_"+str(train_test_id)+"_agg_tab")
silent = df_aggregated.unpersist()
silent = capped_df.unpersist()
silent = df_scaled.unpersist()
spark.stop()
backup :
with open(os.path.join(pickle_root, 'records.pickle'), 'wb') as file_pi:
pickle.dump(records, file_pi)
with open(os.path.join(pickle_root, 'scaled_records.pickle'), 'wb') as file_pi:
pickle.dump(scaled_records, file_pi)
reload :
with open(os.path.join(pickle_root, 'records.pickle'), 'rb') as file_pi:
records = pickle.load(file_pi)
with open(os.path.join(pickle_root, 'scaled_records.pickle'), 'rb') as file_pi:
scaled_records = pickle.load(file_pi)
def create_sequences(X, y, time_steps=TIME_STEPS):
Xs, ys = [], []
for i in range(len(X)-time_steps):
Xs.append(X.iloc[i:(i+time_steps)].values)
ys.append(y.iloc[i:(i+time_steps)].values)
return np.array(Xs), np.array(ys)
X_train, y_train = create_sequences(scaled_data[['scaled_'+sensor_name]], scaled_data[['scaled_'+sensor_name]])
assert ~(X_train != y_train).any(), 'the input sequencing method didn''t provide outputs identical to inputs'
print(f'Training input shape: {X_train.shape}')
,
StackExchange
.
. If you're impatient to see what the resulting tuned model looks like, you can skip the optimization sections and directly jump to §4.3. Best Tuned Model.
REMARK : the output of the "encoder_lstm_tail" layer from the above figure is the "encoding" of the input signal. We feed our Encoder a record of shape "time_steps x features_count" and get a (flat) vector of length "lstm_units". Our Encoder does "encode" a time sequence into a (flat) 1D vector. This Encoding is often used as a dimension-reduced representation of a time sequence. It is indeed demonstrably a "signature" to the time-sequence, since even our Decoder here can reconstruct the input time-sequence using only the encoding as input.
from my_TS_Anomaly_lib.bayesian_optimization import KerasTunerBayesianOptimization
from my_TS_Anomaly_lib.model import autoencoder_model
tuner_root = os.path.join('.', 'tuner', sensor_name)
timestamp = datetime.datetime.now().strftime("%m%d%H%M") # not too long, or tensorboard fails (dir path)
print("Session timestamp : " + str(timestamp))
tuner = KerasTunerBayesianOptimization(
autoencoder_model(time_steps = X_train.shape[1], features_count = X_train.shape[2])
, objective = 'val_loss'
, max_trials = max_trials
, executions_per_trial = executions_per_trial
, directory = tuner_root
, project_name = timestamp
, overwrite = True)
tuner.search_space_summary()
from my_TS_Anomaly_lib.jupyter_tensorboard_windows_launcher import JupyterTensorboardWindows
tb_launcher = JupyterTensorboardWindows( os.path.join('.', 'tuner') )
tb_launcher.run()
#On Windows, also view TensorBoard thru "http://localhost:6006/"
%reload_ext tensorboard
# embed a Tensorboard HTML page in an iframe in the below jupyter notebook cell (will only show when running)
%tensorboard --logdir {os.path.join('.', 'tuner').replace("\\", "\\\\")}
from tensorflow.keras.callbacks import ReduceLROnPlateau
learning_rate_reduction = ReduceLROnPlateau(monitor='loss' # purposely not 'val_loss', here
, patience=10, factor=0.1
, verbose=1)
from my_TS_Anomaly_lib.bayesian_optimization import TailoredTensorBoard
# BEWARE ! TensorBoard over GPU now requires https://github.com/tensorflow/tensorflow/issues/35860
tensorboard_root = os.path.join(tuner_root, timestamp, 'tb')
os.makedirs(tensorboard_root, exist_ok=True)
tensorboard_callback = TailoredTensorBoard(tensorboard_root)
silent = os.system('cls') # clear terminal
batch_size = 256
nb_epochs = 100
tuner.search(X_train, X_train
, epochs=nb_epochs, batch_size=batch_size
, validation_split=0.15
, callbacks=[learning_rate_reduction, tensorboard_callback]
)
from my_TS_Anomaly_lib.bayesian_optimization \
import tuner_tensorboard_to_history_df, plot_tuner_models_histories
history_df = tuner_tensorboard_to_history_df(tensorboard_root, verbose = 0)
plot_tuner_models_histories(history_df)
from my_TS_Anomaly_lib.bayesian_optimization import plot_tuner_models_performances
plot_tuner_models_performances(history_df)
from my_TS_Anomaly_lib.bayesian_optimization import tuner_trials_hparams_to_df
from my_TS_Anomaly_lib.utils import df_highlight_row
trials_hp = tuner_trials_hparams_to_df(tuner_project_dir = os.path.join(tensorboard_root, '..'))
df_highlight_row(df = trials_hp.drop(['trial_id'], axis=1)
, row_index = trials_hp['val_loss (best)'].idxmin())
backup :
with open(os.path.join(model_root, sensor_name+'_trained_tuner.pickle'), 'wb') as file_pi:
pickle.dump(tuner, file_pi)
with open(os.path.join(model_root, sensor_name+'_hyperparameter.json')
, 'w', encoding='utf-8') as json_file :
json.dump(
tuner.get_best_hyperparameters()[0].__dict__['values']
, json_file, ensure_ascii=False, indent=4)
tuner.get_best_models()[0].save_weights(os.path.join(model_root, sensor_name+'_weights.hdf5'))
reload :
TIME_STEPS = 30 ; train_test_id = 3 ; sensor_name = 'sensor_1' ### <<<=== DEBUG - IGNORE THIS WHOLE CELL !!!!
max_trials = 12 ; executions_per_trial = 3
tuner_root = os.path.join('.', 'tuner', sensor_name)
timestamp = '12021315' ; tensorboard_root = os.path.join(tuner_root, timestamp, 'tb')
save_last_days_count = 4
with open(os.path.join(model_root, sensor_name+'_trained_tuner.pickle'), 'rb') as file_pi:
tuner = pickle.load(file_pi)
# override "best_model" (if ever useful)
with open(os.path.join(model_root, sensor_name+'_hyperparameter.json')) as json_file:
best_hyperparameters_dict = json.load(json_file)
best_model = rebuild_model(best_hyperparameters_dict) # compiled but untrained
#best_model.summary()
best_model.load_weights(os.path.join(model_root, sensor_name+'_weights.hdf5'))
best_model = tuner.get_best_models()[0]
best_hps = tuner.get_best_hyperparameters()[0]
#best_model.summary()
from my_TS_Anomaly_lib.model import plot_autoencoder
architecture_plot = plot_autoencoder(best_model)
silent = cv2.imwrite(os.path.join('.', 'images', 'architecture.png'), architecture_plot)
|
# plot the loss distribution of the training set
X_pred = best_model.predict(X_train)
# compute our model signal reconstruction loss (MeanAbsoluteError)
scored_train = pd.DataFrame()
scored_train['Loss_mae'] = np.mean(np.abs(X_pred-X_train), axis = 1).flatten()
threshold = scored_train['Loss_mae'].max()
plt.figure(figsize = (15, 3))
plt.title('Distribution of the Signal Reconstruction Loss\n' +
'(training dataset, ' + sensor_name + ', normal bearing working conditions)'
, fontsize=14)
n, x, _ = plt.hist(scored_train['Loss_mae'], bins = 200, density=True
, color = 'purple', alpha=0.4)
density_fct = stats.gaussian_kde(scored_train['Loss_mae'])
plt.plot(x, density_fct(x), color = 'purple')
plt.xlabel('Mean Absolute Error') ; plt.ylabel('Timestamps (density)')
#plt.xlim([0,.003]) ;
plt.axvline(x=threshold, color='orange')
ymin, ymax = plt.ylim()
plt.annotate('Anomaly Loss Threshold', xy=(threshold, ymax), xytext=(-10, -15)
, textcoords='offset points', rotation=90, va='top', ha='center'
, annotation_clip=False, color='orange')
plt.show()
loss_distrib = scored_train.describe(percentiles=[.25, .5, .75, .98, .9995, .1]).T
display(loss_distrib)
# retrieve time series after the "saved last days" date
# (from the source dataset kept out of the training dataset)
test_start_datetime = \
data['timestamp'][data['test_id'] == train_test_id] \
.max() + pd.Timedelta(days=-save_last_days_count)
test_data = \
data[(data['test_id'] == train_test_id)
& (data['timestamp'] >= test_start_datetime)
][['timestamp', sensor_name]].reset_index(drop=True)
# apply scaling
with open(os.path.join(model_root, sensor_name + '_scaler.pickle'), 'rb') as file_pi:
sensor_scaler_fct = pickle.load(file_pi)
X_test = pd.DataFrame(sensor_scaler_fct(
list(test_data[sensor_name].values) + ([0]*TIME_STEPS)
))
# create the input sequences
X_test, y_test = create_sequences(X_test, X_test)
print(f'Testing shape: {X_test.shape}')
# calculate the loss on the test set
# for each of the test timestamps
# (we do it here in one batch)
X_pred = best_model.predict(X_test)
# calculating the signal reconstruction loss timeline
scored_test = pd.DataFrame()
scored_test['Loss_mae'] = np.mean(np.abs(X_pred-X_test), axis = 1).flatten()
scored_test['Threshold'] = threshold
scored_test['Anomaly'] = (scored_test['Loss_mae'] > scored_test['Threshold']).shift(TIME_STEPS)
scored_test.loc[:TIME_STEPS, 'Anomaly'] = False # moving window "past before start"
print("Detected " + str(scored_test['Anomaly'].sum()) +
" abnormal bearing vibration timestamps in total.")
from my_TS_Anomaly_lib.model import plot_predictions
plot_predictions(test_data, scored_test)
first_anomaly_timestamp = test_data.loc[scored_test[scored_test['Anomaly']].index[0], 'timestamp']
last_timestamp = pd.to_datetime(test_data[['timestamp']].iloc[-1].values[0], unit='D')
Export Notebook to HTML (with Markdown extension cells evaluated and hidden cell outputs omitted) :
import os ; from my_TS_Anomaly_lib.utils import get_notebook_fullname
get_notebook_fullname()
(to be launched from the conda env which holds the Jupyter installation, the one named 'r-tensorflow', in our personal case)
"Ctrl+D" (to exit python console mode)
conda activate r-tensorflow
python
import os ; os.getcwd()
os.chdir('D:\jupyter_notebooks\TimeSeries_Anomaly_Detection') # to set working dirctory
from my_TS_Anomaly_lib.jupyter_markdown_extension import md_extension_to_html
md_extension_to_html('D:\jupyter_notebooks\TimeSeries_Anomaly_Detection\main.ipynb')
CLEAR
tensorflow.keras.backend.clear_session()